理解 Node.js 中的 Stream 您所在的位置:网站首页 Nodejs 流stream 理解 Node.js 中的 Stream

理解 Node.js 中的 Stream

2023-05-11 11:23| 来源: 网络整理| 查看: 265

Node.js 中的流以难以使用而著称,甚至更难理解。

用 Dominic Tarr 的话来说:“stream 是 node 中最好用的也是最不易理解的存在。即使是 Redux 的创建者和 React.js 的核心团队成员 Dan Abramov 也害怕 node stream。

img

本文将帮助您理解 stream 以及如何使用它们。所以,不要害怕。follow me ! 🏖

什么是 streams ?

Stream 是支持 Node.js 应用程序的基本概念之一。它们是一种数据处理方法,用于依次读写数据的输入并输出。

stream 是一种有效地处理读/写文件、网络通信或任何类型的端到端信息交换的方法。

stream 的独特之处在于,它不像传统程序那样 一次性 将文件读入内存,而是逐块读取数据块,处理其内容而不会将其全部保存在内存中。

这使得 stream 在处理大量数据时非常强大。例如,文件大小可能大于空闲内存空间,因此不可能将整个文件读取到内存中以进行处理。这就是 stream 来拯救我们的地方!

使用 stream 来处理较小的数据块,使它可以读取整体较大的文件。

让我们以 Youtube 或 Netflix 等 ”流媒体“ 服务为例:这些服务不会让你一次性下载整个视频和音频。相反,你的浏览器会接收到连续不断的视频片段,让接收人立即开始观看。

然而,stream 不仅仅与媒体或大数据打交道。它们还为我们的代码提供了 ”可组合性“ 的能力。考虑到可组合性的设计意味着几个组件可以以某些方式组合,从而产生相同类型的结果。在 node.js 中,可以使用 stream 将数据传输到各个较小的代码片段中,它们可以自由携带与组合,从而编写出功能强大的代码片段。

stream 的优点

与其他处理数据方法相比,stream 提供了两个主要优势:

内存效率:在你处理数据之前,你不需要在内存中加载大量(或整个)数据 时间效率:一旦有了数据,就可以开始处理,这大大减少开始处理数据的时间,而不必等到整个数据加载完毕再进行处理。 node.js 中的 4 种 stream Writable:可写的数据流。例如,fs.createWriteStream() 允许我们使用流将数据写入文件。 Readable:可读的数据流。例如,fs.createReadStream() 允许我们读取文件内容。 Duplex:既可读又可写的双工流。 例如 net.Socket。 Transform:可以在读写数据时修改或转换数据的转换流。例如,在文件压缩的实例中(一个已压缩的文件),你可以在写入压缩的数据时,从文件中读取解压缩后的数据。

如果你使用过 nodejs,那么你可能遇到过 stream。例如,在基于 nodejs 的 http 服务器中,request 是可读流,response 是可写流。你可能已经使用了 fs 模块,它允许你处理可读和可写的文件流。在你使用 express,你都在使用流与客户端交互,同时,你使用的每个数据库连接驱动程序中都有应用流,因为 TCP 套接字,TLS 堆栈和其他连接都是基于 nodejs 的。

Example 1. 创建可读流 readable stream

我们首先引入 Readable,然后对它进行初始化。

const Stream = require('stream') const readableStream = new Stream.Readable() 复制代码

现在 stream 已经初始化,我们可以发送数据给它:

readableStream.push('ping!') readableStream.push('pong!') readableStream.push(null) 复制代码 Async iterator

强烈建议在处理流时使用 async iterator(异步迭代器)。根据 Dr. Axel Rauschmayer 的说法,异步迭代是一种异步检索数据容器内容的协议(意味着当前的 ”任务“ 可能会在检索项之前暂停)。同样,值得一提的是,stream async iterator 的实现使用的内部的 ”可读“ 事件。

你可以使用 async iterator 读取可读流:

import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); // 执行一次,可以由 stream creator 来控制触发 data 事件频率,下文有提到 } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test!\n' 复制代码

它也可以收集可读流的内容到一个字符串:

import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; // 执行13次,可以由 stream creator 来控制触发 data 事件频率,下文有提到 } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!'); 复制代码

一定要记住,不要将 async 函数与 EventEmitter 混合使用,因为当前,当 reject 在事件处理程序中 emit 时,没有办法捕获它,这导致很难跟踪错误和内存泄漏。当前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。这个 pull request 旨在解决这个问题,可能已经合入到 node 的核心代码中了。

关于 node stream 的更多异步遍历解析,请阅读[这篇文章])(2ality.com/2019/11/nod…)

Readable.from(): 在迭代中创建可读流

stream.Readable.from(iterable, [options]) 它是一种常用方法,用于从迭代器中创建可读流,它保存 iterable 中包含的数据。Iterable 可以是同步迭代,也可以是异步迭代。参数选项是可选的,可以用来指定文本编码。

const { Readable } = require('stream'); async function * generate() { yield 'hello'; yield 'streams'; } const readable = Readable.from(generate()); readable.on('data', (chunk) => { console.log(chunk); }); 复制代码 两种读取模式

根据 Streams API,可读流有效地在两种模式中操作: flowing 和 paused。可读流可以是对象模式或非对象模式,而不管它是流模式还是暂停模式。

flowing mode,数据自动从系统底层读取,并通过 EventEmitter 接口事件尽可能快地提供给应用程序。

paused mode,stream.read() 方法必须显示调用,以从流中读取数据块。

在 flowing 模式中,要从流中读取数据,可以监听 data 事件并附加回调。当数据块可用时,可读流再发送一个 end 事件并执行回调。如下示例:

var fs = require("fs"); var data = ''; var readerStream = fs.createReadStream('file.txt'); //Create a readable stream readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. // Handle stream events --> data, end, and error readerStream.on('data', function(chunk) { data += chunk; }); readerStream.on('end',function() { console.log(data); }); readerStream.on('error', function(err) { console.log(err.stack); }); console.log("Program Ended"); 复制代码

fs.createReadStream() 创建了一个可读流。起初,stream 处于静止状态。**一旦监听 data 事件并附加回调,它就开始流动。**在那之后,数据块被读取并传递给你的回调。流实现者决定 data 事件触发的频率。例如, http 请求可能会在每读取几 kb 数据时就触发一个 data 事件。当你从文件读取数据时,你可以决定在读取一行后触发一个 data 事件。

当没有更多的数据要读取(到达 end )时,流触发一个 end 事件。在上面的代码片段中,我们监听这个事件,以便在到达终点时得到通知。

此外,如果有错误,流将触发 error 事件。

在 paused 模式中,你只需要在流实例上反复调用 read(),直到每个数据块都被读取,如下示例:

var fs = require('fs'); var readableStream = fs.createReadStream('file.txt'); var data = ''; var chunk; readableStream.on('readable', function() { while ((chunk=readableStream.read()) != null) { data += chunk; } }); readableStream.on('end', function() { console.log(data) }); 复制代码

read 函数的作用是:从内部缓冲区中读取一些数据并返回这些数据。当没有要读取的内容时,它返回 null。在 while 循环中,我们检查 null 值并终止循环。请注意,当可以从流中读取数据块时,将触发 readable 事件。

flowing 与 paused模式切换

所有可读流可以从 paused 模式开始,通过以下任意一种方式切换为 flowing 模式:

增加 'data' 事件. 调用 stream.resume() 方法。 调用 stream.pipe() 方法,发送数据到可写流。

可读流同样可以通过以下任一种方式切换回 paused 模式:

如果没有 pipe 的写入目标,通过调用 stream.pause() 方法。 如果有 pipe 的写入目标,则删除所有 pipe 的写入目标。可以通过调用 stream.unpipe() 方法来删除多个 pipe 目标。

需要记住的重要概念是,只有提供了使用或忽略数据的机制(事件),可读流才会生成数据。如果消费机制被禁用或取消,可读流将尝试停止生成数据。可读流可暂停,可继续,可通过 readable.read() 逐步读取数据。如果 “readable” 事件处理程序被删除,那么如果存在 “data” 事件处理程序,流将再次开始流动。

2. 创建可写流 writable stream

要将数据写入可写流,需要在流实例上调用 write()。如下示例:

var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { writableStream.write(chunk); }); 复制代码

上面的代码很简单。它只是从输入流中读取数据块,然后使用 write() 将数据写入目标。此函数返回一个布尔值,指示操作是否成功。如果为 true,那么写入操作成功,您可以继续写入更多数据。如果返回 false,则意味着出现了错误,此时您无法编写任何内容。可写流将通过触发 drain 事件让你知道何时可以继续开始写入数据。

调用 writable.end() 方法表明不会再向可写流写入数据。调用后,其可选的回调函数将作为 finish 事件触发。

// Write 'hello, ' and then end with 'world!'. const fs = require('fs'); const file = fs.createWriteStream('haha.md'); file.write('hello, '); file.end('world!'); // Writing more now is not allowed! file.on('finish', function () { console.log('finished') }) 复制代码

使用可写流,你可以从可读流读取数据:

const Stream = require('stream') const readableStream = new Stream.Readable() const writableStream = new Stream.Writable() writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next() } writableStream.on('finish', function () { console.log('finished') writableStream.end() }) readableStream.pipe(writableStream) readableStream.push('ping!') readableStream.push('pong!') readableStream.push(null) 复制代码

(推荐)你还可以使用异步迭代器来写入可写入流:

import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable); } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt'); assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text.\n'); 复制代码

finish() 的默认是基于回调的,但是可以通过 util.promisify() (第A行) 将其转换为基于 Promise 的版本。

在本例中,它使用了以下两种模式:

在处理写入错误时,等待 drain 事件(可写入)继续写入可写流 (行 B):

if (!writable.write(chunk)) { await once(writable, 'drain'); } 复制代码

写入完成后,关闭可写流(行 C):

writable.end(); await finished(writable); 复制代码 pipeline()

pipeline 是一种机制,我们将一个流的输出作为另一个流的输入。它通常用于从一个流获取数据,并将该流的输出传递给另一个流。管道操作没有限制。换句话说,管道用于在多个步骤中处理流数据。

⚠️注意!pipe 或 pipeline只有目标流为 Duplex 流或 Transform 流才可以形成管道链。文档

在 Node 10.x 引入了 stream.pipeline()。这是一个模块方法,用于在流之间进行管道传输,转发错误,适当清理,并在管道完成时提供回调。

如下就是使用 pipeline 的示例:

const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge video file efficiently: pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), (err) => { if (err) { console.error('Pipeline failed', err); } else { console.log('Pipeline succeeded'); } } ); 复制代码

pipeline 应该替代 pipe ,因为 pipe 不安全。

Stream Module

nodejs stream 模块提供了构建所有流 api 的基础。

stream 模块是 Node.js 中默认提供的本地模块。stream 是 EventEmitter 类的一个实例,它在 Node 中异步地处理事件。因此,stream 本质上是基于事件的。

获取 stream 模块:

const stream = require('stream'); 复制代码

stream 模块对于创建新类型的流实例非常有用。通常没有必要使用 stream 模块来创建流,还有更常用的其他方式。

Streams APIs

由于其优势,许多 Node.js 核心模块提供了本地流处理能力,最明显的是:

net.Socket is the main node api that is stream are based on, which underlies most of the following APIs process.stdin returns a stream connected to stdin process.stdout returns a stream connected to stdout process.stderr returns a stream connected to stderr fs.createReadStream() creates a readable stream to a file fs.createWriteStream() creates a writable stream to a file net.connect() initiates a stream-based connection http.request() returns an instance of the http.ClientRequest class, which is a writable stream zlib.createGzip() compress data using gzip (a compression algorithm) into a stream zlib.createGunzip() decompress a gzip stream. zlib.createDeflate() compress data using deflate (a compression algorithm) into a stream zlib.createInflate() decompress a deflate stream Streams 清单

img

img

img

img

img

下面是与可写流相关的重要事件:

error – 触发表示在 写入/pipe 时发生了错误。 pipeline – 可读流通过 pipe 输送到可写流时,该事件由可写流发出。 unpipe – 当你在可读流上调用 unpipe 并阻止它 pipe 传输到目标流时触发。 总结

这都是关于 stream 的基础知识。streams、pieps和 chaining 是 Node.js 的核心和最强大的特性。 streams 确实可以帮助你编写整洁和高性能的代码来执行 I/O。

此外,还有一个值得关注的 Node.js 战略计划,名为 BOB,旨在改进 Node.js stream 数据接口,既包括在 Node.js 核心内部,也有希望成为未来的公共 api。

参考 Understanding Streams in Node.js 流的优秀文章 A Visual Guide to NodeJS Streams


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有